package io.mqttpush.mqttserver.service;

import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mqttpush.mqttserver.beans.ConstantBean;
import io.mqttpush.mqttserver.beans.ServiceBeans;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.DefaultEventExecutor;

/**
 * 维护管理着主题
 * 
 * @author tianzhenjiu
 *
 */
public class TopicManager {

	Logger logger = LoggerFactory.getLogger(getClass());
	/**
	 * 记录着主题的所有订阅者和订阅要求的qos
	 */
	final Map<String, Map<String,MqttQoS>> topicDevices;

	final SecureRandom random = new SecureRandom();

	ChannelUserService channelUserService;

	Map<String, ChannelGroup> mapChannelGroup;

	
	
	
	public TopicManager() {

		channelUserService = ServiceBeans.getInstance().getChannelUserService();

		mapChannelGroup = new ConcurrentHashMap<>();
		mapChannelGroup.putIfAbsent(ConstantBean.adminRecivTopic, new DefaultChannelGroup(new DefaultEventExecutor()));

		topicDevices = new ConcurrentHashMap<>();
		initTopc();
	}

	/**
	 * 必须在亲缘线程执行
	 * 
	 * @param deviceId
	 * @param topicname
	 * @param mqttQoS
	 */
	public void subscribe(String deviceId, String topicname, MqttQoS mqttQoS) {

		Channel channel = channelUserService.channel(deviceId);
		
		if (channel == null || !channel.isActive()) {
			logger.warn("注意设备号对于的channel没有上线" + deviceId);
		}

		if (!topicDevices.containsKey(topicname)) {

//			logger.warn("订阅失败，订阅了无效的主题");
//			return;
			
			if(topicname.startsWith("/root")) {
				Map<String, MqttQoS> topicsA=new HashMap<>();
				topicDevices.putIfAbsent(topicname,topicsA );
			}
		}
		
		topicDevices.get(topicname).put(deviceId, mqttQoS);
		logger.info(deviceId + "订阅成功" + topicname + mqttQoS);

	}

	/**
	 * 处理订阅
	 * 
	 * 必须在亲缘线程执行
	 * 
	 * @param channel
	 * @param topicname
	 * @param mqttQoS
	 */
	public void subscribe(Channel channel, String topicname, MqttQoS mqttQoS) {

		if (topicname == null) {
			logger.warn("主题怎么能为空?");
			return;
		}

		String deviceId = channelUserService.deviceId(channel);

		if (deviceId == null) {

			logger.warn("订阅失败，怎么会出现为空的设备号?");
			return;
		}

		
		
		subscribe(deviceId, topicname, mqttQoS);

	}

	/**
	 * 取消订阅 必须在亲缘线程执行
	 * 
	 * @param deviceId
	 * @param topName
	 */
	public void unscribe(String deviceId, String topicname) {

		Channel channel = channelUserService.channel(deviceId);
		if (channel == null || (!channel.isActive())) {
			logger.warn("取消订阅失败，取消订阅的时候 必须channnel在线");
			return;
		}
		
		if (topicname == null) {
			logger.warn("取消订阅失败,主题怎么能为空?");
			return;
		}
		
		MqttQoS mqttQoS=topicDevices.get(topicname).remove(deviceId);
		if(mqttQoS!=null) {
			logger.info(deviceId+"取消订阅成功"+topicname);
		}else {
			logger.info(deviceId+"取消订阅失败"+topicname);
		}
		
	}

	/**
	 * 初始化三个订阅用的主题
	 */
	public void initTopc() {

	
		Map<String, MqttQoS> topicsA=new HashMap<>();
		topicDevices.putIfAbsent("/root/topicA",topicsA );

		Map<String, MqttQoS> topicsB=new HashMap<>();
		topicDevices.putIfAbsent("/root/topicB",topicsB );
		
		Map<String, MqttQoS> topicsC=new HashMap<>();
		topicDevices.putIfAbsent("/root/topicC",topicsC );
	}

	/**
	 * 创建一个topic
	 * @param topicName
	 * @param members
	 * @return
	 */
	public boolean createTopic(String topicName,int members) {
		
		if(topicDevices.containsKey(topicName)) {
			return false;
		}
		Map<String, MqttQoS> topicsC=new HashMap<>();
		
		return topicDevices.putIfAbsent("/root/topicC",topicsC )==null;
	}
	/**
	 * 根据主题执行 action
	 * 
	 * @param topicName
	 * @param action
	 */
	public void channelsAction(String topicName, BiConsumer<String, MqttQoS> action) {
		
		if (topicName == null) {
			logger.warn("主题怎么能为空?");
			return;
		}
		
		if (action == null) {
			logger.warn("action怎么能为空?");
			return;
		}
		
		
		if (!topicDevices.containsKey(topicName)) {
			logger.debug("没有这个主题"+topicName);
			return;
		}

		topicDevices.get(topicName).forEach(action);
	}

	public void  channelGroupSubscribe(String topicName,Channel channel) {
		
		if (topicName == null) {
			logger.warn("主题怎么能为空?");
			return;
		}
		
		if (channel == null) {
			logger.warn("channel怎么能为空?");
			return;
		}
		
		ChannelGroup channelGroup = null;
		
		if (mapChannelGroup.containsKey(topicName)) {
			channelGroup = mapChannelGroup.get(topicName);
		}
		
		if(channelGroup!=null) {
			channelGroup.add(channel);
		}
		
	}
	/**
	 * 直接群发bytebuf
	 * 
	 * @param topicName
	 * @param byteBuf
	 */
	public void channelGroupWrite(String topicName, Object writeObj) {

		
		if (topicName == null) {
			logger.warn("主题怎么能为空?");
			return;
		}
		
		if (writeObj == null) {
			logger.warn("writeObj怎么能为空?");
			return;
		}

		
		ChannelGroup channelGroup = null;
		
		if (mapChannelGroup.containsKey(topicName)) {
			channelGroup = mapChannelGroup.get(topicName);
		}
		if (channelGroup != null) {
				channelGroup.writeAndFlush(writeObj);
		} else {
			logger.warn("发送失败,这个主题不存在任何channel" + topicName);
		}
	
		
	}


}
